package org.databandtech.flinkstreaming;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StateApp {

	public static void main(String[] args) {
		// 创建Flink执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.fromElements(Tuple2.of("id001", 3L),Tuple2.of("id001", 5L),Tuple2.of("id001", 7L),Tuple2.of("id002", 6L),Tuple2.of("id002", 42L),Tuple2.of("id003", 27L))
        .keyBy(e -> e.f0)
        .flatMap(new StateCountWindow())
        .print();
		
        try {
			env.execute("CountApp");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
